package com.fwmagic.dynamic_rule.functions;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public class SourceFunctions {

    /**
     * 创建Kafka消费者
     *
     * @return
     */
    public static FlinkKafkaConsumer<String> createKafkaSource(String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");
        props.put("auto.offset.reset", "latest");

        //添加kafka的source
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer(
                topic,
                new SimpleStringSchema(),
                props);
        return consumer;
    }
}
